// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Days;
use chrono::Duration;
use chrono::Utc;
use databend_common_base::base::uuid::Uuid;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::LeastVisibleTime;
use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::SegmentsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_io::Files;
use databend_storages_common_table_meta::meta::uuid_from_date_time;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX;
use futures_util::TryStreamExt;
use log::info;
use opendal::Entry;
use opendal::ErrorKind;
use opendal::Operator;
use opendal::Scheme;
use uuid::Version;

/// An assumption of the maximum duration from the time the first block is written to the time the
/// snapshot is written.
///
/// To handle the situation during an upgrade where some nodes may not be able to upgrade in time to
/// a version that includes the vacuum2 logic, we introduce this assumption. It is used in two places:
///
/// - When determining whether a snapshot object generated by an old version node can be cleaned up
///
///   Snapshots whose object key does not start with `VACUUM2_OBJECT_KEY_PREFIX` are all created by
///   nodes of previous versions (do not support vacuum2). For such snapshot objects, if their
///   timestamp is less than
///   `GC_root's timestamp - ASSUMPTION_MAX_TXN_DURATION`
///   we consider them safe to delete.
///
///   Generally speaking, if a snapshot from an old version was created a sufficiently long time
///   before the gc root, it would not be successfully committed after the gc root; this way, we
///   avoid deleting a snapshot object produced by an ongoing (not yet committed) transaction.
///
/// - When determining whether a segment/block object generated by an old version query node can be
///   cleaned up
///
///   Similarly, if a segment/block was created at a time sufficiently long before the gc root and
///   is not referenced by the gc root, then it will not be referenced by a snapshot that can be
///   successfully committed after the gc root, and safe to delete.
///
/// NOTE:
///   If this assumption does not hold, it may lead to table data becoming inaccessible:
///   snapshots may become inaccessible, or some data may become unavailable.
///
///   If the entire cluster is upgraded to the new version that includes the vacuum2 logic,
///   the above risks will not exist.
const ASSUMPTION_MAX_TXN_DURATION: Duration = Duration::days(3);
#[async_backtrace::framed]
pub async fn do_vacuum2(
    table: &dyn Table,
    ctx: Arc<dyn TableContext>,
    respect_flash_back: bool,
) -> Result<Vec<String>> {
    let fuse_table = FuseTable::try_from_table(table)?;
    let start = std::time::Instant::now();

    let retention_period_in_days = if fuse_table.is_transient() {
        0
    } else {
        ctx.get_settings().get_data_retention_time_in_days()?
    };

    let is_vacuum_all = retention_period_in_days == 0;

    let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else {
        return Ok(vec![]);
    };

    ctx.set_status_info(&format!(
        "set lvt for table {} takes {:?}, lvt: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        lvt
    ));

    let start = std::time::Instant::now();
    let snapshots_before_lvt = if is_vacuum_all {
        list_until_prefix(
            fuse_table,
            fuse_table
                .meta_location_generator()
                .snapshot_location_prefix(),
            fuse_table.snapshot_loc().unwrap().as_str(),
            true,
            None,
        )
        .await?
    } else {
        list_until_timestamp(
            fuse_table,
            fuse_table
                .meta_location_generator()
                .snapshot_location_prefix(),
            lvt,
            true,
            None,
        )
        .await?
    };

    let elapsed = start.elapsed();
    ctx.set_status_info(&format!(
        "list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}",
        fuse_table.get_table_info().desc,
        elapsed,
        fuse_table.meta_location_generator().snapshot_location_prefix(),
        lvt,
        slice_summary(&snapshots_before_lvt)
    ));

    let start = std::time::Instant::now();
    let Some((gc_root, snapshots_to_gc, gc_root_meta_ts)) = select_gc_root(
        fuse_table,
        &snapshots_before_lvt,
        is_vacuum_all,
        respect_flash_back,
        ctx.clone().get_abort_checker(),
        lvt,
    )
    .await?
    else {
        return Ok(vec![]);
    };
    ctx.set_status_info(&format!(
        "select gc_root for table {} takes {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        gc_root,
        slice_summary(&snapshots_to_gc)
    ));

    let start = std::time::Instant::now();
    let gc_root_timestamp = gc_root.timestamp.unwrap();
    let gc_root_segments = gc_root
        .segments
        .iter()
        .map(|(path, _)| path)
        .collect::<HashSet<_>>();
    let segments_before_gc_root = list_until_timestamp(
        fuse_table,
        fuse_table
            .meta_location_generator()
            .segment_location_prefix(),
        gc_root_timestamp,
        false,
        Some(gc_root_meta_ts),
    )
    .await?
    .into_iter()
    .map(|v| v.path().to_owned())
    .collect::<Vec<_>>();

    ctx.set_status_info(&format!(
        "list segments before gc_root for table {} takes {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        fuse_table.meta_location_generator().segment_location_prefix(),
        gc_root_timestamp,
        slice_summary(&segments_before_gc_root)
    ));

    let start = std::time::Instant::now();
    let segments_to_gc: Vec<String> = segments_before_gc_root
        .into_iter()
        .filter(|s| !gc_root_segments.contains(s))
        .collect();
    ctx.set_status_info(&format!(
        "Filter segments to gc for table {} takes {:?}, segments_to_gc: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        slice_summary(&segments_to_gc)
    ));

    let start = std::time::Instant::now();
    let segments_io =
        SegmentsIO::create(ctx.clone(), fuse_table.get_operator(), fuse_table.schema());
    let segments = segments_io
        .read_segments::<Arc<CompactSegmentInfo>>(&gc_root.segments, false)
        .await?;
    let mut gc_root_blocks = HashSet::new();
    for segment in segments {
        gc_root_blocks.extend(segment?.block_metas()?.iter().map(|b| b.location.0.clone()));
    }
    ctx.set_status_info(&format!(
        "read segments for table {} takes {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
    ));

    let start = std::time::Instant::now();
    let blocks_before_gc_root = list_until_timestamp(
        fuse_table,
        fuse_table.meta_location_generator().block_location_prefix(),
        gc_root_timestamp,
        false,
        Some(gc_root_meta_ts),
    )
    .await?
    .into_iter()
    .map(|v| v.path().to_owned())
    .collect::<Vec<_>>();

    ctx.set_status_info(&format!(
        "list blocks before gc_root for table {} takes {:?}, block_dir: {:?}, least_visible_timestamp: {:?}, blocks: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        fuse_table.meta_location_generator().block_location_prefix(),
        gc_root_timestamp,
        slice_summary(&blocks_before_gc_root)
    ));

    let start = std::time::Instant::now();
    let blocks_to_gc: Vec<String> = blocks_before_gc_root
        .into_iter()
        .filter(|b| !gc_root_blocks.contains(b))
        .collect();
    ctx.set_status_info(&format!(
        "Filter blocks to gc for table {} takes {:?}, blocks_to_gc: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        slice_summary(&blocks_to_gc)
    ));

    let start = std::time::Instant::now();
    let catalog = ctx.get_default_catalog()?;
    let table_agg_index_ids = catalog
        .list_index_ids_by_table_id(ListIndexesByIdReq::new(
            ctx.get_tenant(),
            fuse_table.get_id(),
        ))
        .await?;
    let inverted_indexes = &fuse_table.get_table_info().meta.indexes;
    let mut indexes_to_gc = Vec::with_capacity(
        blocks_to_gc.len() * (table_agg_index_ids.len() + inverted_indexes.len() + 1),
    );
    for loc in &blocks_to_gc {
        for index_id in &table_agg_index_ids {
            indexes_to_gc.push(
                TableMetaLocationGenerator::gen_agg_index_location_from_block_location(
                    loc, *index_id,
                ),
            );
        }
        for idx in inverted_indexes.values() {
            indexes_to_gc.push(
                TableMetaLocationGenerator::gen_inverted_index_location_from_block_location(
                    loc,
                    idx.name.as_str(),
                    idx.version.as_str(),
                ),
            );
        }
        indexes_to_gc
            .push(TableMetaLocationGenerator::gen_bloom_index_location_from_block_location(loc));
    }

    ctx.set_status_info(&format!(
        "collect indexes to gc for table {} takes {:?}, indexes_to_gc: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        slice_summary(&indexes_to_gc)
    ));

    let start = std::time::Instant::now();
    let subject_files_to_gc: Vec<_> = segments_to_gc
        .into_iter()
        .chain(blocks_to_gc.into_iter())
        .collect();
    let op = Files::create(ctx.clone(), fuse_table.get_operator());

    // order is important
    // indexes should be removed before blocks, because index locations to gc are generated from block locations
    // subject_files should be removed before snapshots, because gc of subject_files depend on gc root
    op.remove_file_in_batch(&indexes_to_gc).await?;
    op.remove_file_in_batch(&subject_files_to_gc).await?;

    // Evict snapshot caches from the local node.
    //
    // Note:
    // - Cached snapshots may also exist on other nodes in a multi-node cluster. If these remote
    //   caches are not synchronized, it could lead to incorrect results in operations like
    //   `fuse_snapshot(...)`. However, this does not compromise the safety of the table data.
    // - TODO: To ensure correctness in such cases, the table's Least Visible Timestamp (LVT),
    //   stored in the meta-server, should be utilized to determine snapshot visibility and
    //   resolve potential issues.

    if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
        for path in &snapshots_to_gc {
            snapshot_cache.evict(path);
        }
    }

    op.remove_file_in_batch(&snapshots_to_gc).await?;

    let files_to_gc: Vec<_> = subject_files_to_gc
        .into_iter()
        .chain(snapshots_to_gc.into_iter())
        .chain(indexes_to_gc.into_iter())
        .collect();
    ctx.set_status_info(&format!(
        "remove files for table {} takes {:?}, files_to_gc: {:?}",
        fuse_table.get_table_info().desc,
        start.elapsed(),
        slice_summary(&files_to_gc)
    ));
    Ok(files_to_gc)
}

/// Try set lvt as min(latest_snapshot.timestamp, now - retention_time).
///
/// Return `None` means we stop vacuumming, but don't want to report error to user.
async fn set_lvt(
    fuse_table: &FuseTable,
    ctx: &dyn TableContext,
    retention: u64,
) -> Result<Option<DateTime<Utc>>> {
    let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
        info!(
            "Table {} has no snapshot, stop vacuuming",
            fuse_table.get_table_info().desc
        );
        return Ok(None);
    };
    if !is_uuid_v7(&latest_snapshot.snapshot_id) {
        info!(
            "latest snapshot {:?} is not v7, stop vacuuming",
            latest_snapshot
        );
        return Ok(None);
    }
    let cat = ctx.get_default_catalog()?;
    // safe to unwrap, as we have checked the version is v5
    let latest_ts = latest_snapshot.timestamp.unwrap();
    let lvt_point_candidate = std::cmp::min(Utc::now() - Days::new(retention), latest_ts);

    let lvt_point = cat
        .set_table_lvt(
            &LeastVisibleTimeIdent::new(ctx.get_tenant(), fuse_table.get_id()),
            &LeastVisibleTime::new(lvt_point_candidate),
        )
        .await?
        .time;
    Ok(Some(lvt_point))
}

fn is_uuid_v7(uuid: &Uuid) -> bool {
    let version = uuid.get_version();
    version.is_some_and(|v| matches!(v, Version::SortRand))
}

async fn list_until_prefix(
    fuse_table: &FuseTable,
    path: &str,
    until: &str,
    need_one_more: bool,
    gc_root_meta_ts: Option<DateTime<Utc>>,
) -> Result<Vec<Entry>> {
    info!("list until prefix: {}", until);
    let dal = fuse_table.get_operator_ref();

    match dal.info().scheme() {
        Scheme::Fs => fs_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await,
        _ => general_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await,
    }
}

/// Object storage supported by Databend is expected to return entries sorted in ascending lexicographical
/// order by object key. Databend leverages this property to enhance the efficiency and thoroughness
/// of the vacuum process.
///
/// The safety of the vacuum algorithm does not depend on this ordering.
async fn general_list_until_prefix(
    dal: &Operator,
    path: &str,
    until: &str,
    need_one_more: bool,
    gc_root_meta_ts: Option<DateTime<Utc>>,
) -> Result<Vec<Entry>> {
    let mut lister = dal.lister(path).await?;
    let mut paths = vec![];
    while let Some(entry) = lister.try_next().await? {
        if entry.metadata().is_dir() {
            continue;
        }
        if entry.path() >= until {
            info!("entry path: {} >= until: {}", entry.path(), until);
            if need_one_more {
                paths.push(entry);
            }
            break;
        }
        if gc_root_meta_ts.is_none()
            || is_gc_candidate_segment_block(&entry, dal, gc_root_meta_ts.unwrap()).await?
        {
            paths.push(entry);
        }
    }
    Ok(paths)
}

/// If storage is backed by FS, we prioritize thoroughness over efficiency (though efficiency loss
/// is usually no significant). All entries are fetched and sorted before extracting the prefix entries.
async fn fs_list_until_prefix(
    dal: &Operator,
    path: &str,
    until: &str,
    need_one_more: bool,
    gc_root_meta_ts: Option<DateTime<Utc>>,
) -> Result<Vec<Entry>> {
    // Fetch ALL entries from the path and sort them by path in lexicographical order.
    let lister = dal.blocking().lister(path)?;
    let mut entries = Vec::new();
    for item in lister {
        let entry = item?;
        if entry.metadata().is_file() {
            entries.push(entry);
        }
    }
    entries.sort_by(|l, r| l.path().cmp(r.path()));

    // Extract entries up to the `until` path, respecting lexicographical order.
    let mut res = Vec::new();
    for entry in entries {
        if entry.path() >= until {
            info!("entry path: {} >= until: {}", entry.path(), until);
            if need_one_more {
                res.push(entry);
            }
            break;
        }
        if gc_root_meta_ts.is_none()
            || is_gc_candidate_segment_block(&entry, dal, gc_root_meta_ts.unwrap()).await?
        {
            res.push(entry);
        }
    }

    Ok(res)
}

async fn is_gc_candidate_segment_block(
    entry: &Entry,
    op: &Operator,
    gc_root_meta_ts: DateTime<Utc>,
) -> Result<bool> {
    let path = entry.path();
    let last_part = path.rsplit('/').next().unwrap();
    if last_part.starts_with(VACUUM2_OBJECT_KEY_PREFIX) {
        return Ok(true);
    }
    let last_modified = if let Some(v) = entry.metadata().last_modified() {
        v
    } else {
        let path = entry.path();
        let meta = op.stat(path).await?;
        meta.last_modified().ok_or_else(|| {
            ErrorCode::StorageOther(format!(
                "Failed to get `last_modified` metadata of the entry '{}'",
                path
            ))
        })?
    };

    Ok(last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts)
}

async fn list_until_timestamp(
    fuse_table: &FuseTable,
    path: &str,
    until: DateTime<Utc>,
    need_one_more: bool,
    gc_root_meta_ts: Option<DateTime<Utc>>,
) -> Result<Vec<Entry>> {
    let uuid = uuid_from_date_time(until);
    let uuid_str = uuid.simple().to_string();

    // extract the most significant 48 bits, which is 12 characters
    let timestamp_component = &uuid_str[..12];
    let until = format!(
        "{}{}{}",
        path, VACUUM2_OBJECT_KEY_PREFIX, timestamp_component
    );
    list_until_prefix(fuse_table, path, &until, need_one_more, gc_root_meta_ts).await
}

async fn read_snapshot_from_location(
    fuse_table: &FuseTable,
    path: &str,
) -> Result<Arc<TableSnapshot>> {
    let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
    let ver = TableMetaLocationGenerator::snapshot_version(path);
    info!("read snapshot from location: {}, version: {}", path, ver);
    let params = LoadParams {
        location: path.to_owned(),
        len_hint: None,
        ver,
        put_cache: false,
    };
    reader.read(&params).await
}

async fn select_gc_root(
    fuse_table: &FuseTable,
    snapshots_before_lvt: &[Entry],
    is_vacuum_all: bool,
    respect_flash_back: bool,
    abort_checker: AbortChecker,
    lvt: DateTime<Utc>,
) -> Result<Option<(Arc<TableSnapshot>, Vec<String>, DateTime<Utc>)>> {
    let gc_root_path = if is_vacuum_all {
        // safe to unwrap, or we should have stopped vacuuming in set_lvt()
        fuse_table.snapshot_loc().unwrap()
    } else if respect_flash_back {
        let latest_location = fuse_table.snapshot_loc().unwrap();
        let gc_root = fuse_table
            .find(latest_location, abort_checker, |snapshot| {
                snapshot.timestamp.is_some_and(|ts| ts <= lvt)
            })
            .await?
            .snapshot_loc();
        let Some(gc_root) = gc_root else {
            info!("no gc_root found, stop vacuuming");
            return Ok(None);
        };
        gc_root
    } else {
        if snapshots_before_lvt.is_empty() {
            info!("no snapshots before lvt, stop vacuuming");
            return Ok(None);
        }
        let anchor =
            read_snapshot_from_location(fuse_table, snapshots_before_lvt.last().unwrap().path())
                .await?;
        let Some((gc_root_id, gc_root_ver)) = anchor.prev_snapshot_id else {
            info!("anchor has no prev_snapshot_id, stop vacuuming");
            return Ok(None);
        };
        let gc_root_path = fuse_table
            .meta_location_generator()
            .snapshot_location_from_uuid(&gc_root_id, gc_root_ver)?;
        if !is_uuid_v7(&gc_root_id) {
            info!("gc_root {} is not v7", gc_root_path);
            return Ok(None);
        }
        gc_root_path
    };

    let dal = fuse_table.get_operator_ref();
    let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await;

    let gc_root_meta_ts = match dal.stat(&gc_root_path).await {
        Ok(v) => v.last_modified().ok_or_else(|| {
            ErrorCode::StorageOther(format!(
                "Failed to get `last_modified` metadata of the gc root object '{}'",
                gc_root_path
            ))
        })?,
        Err(e) => {
            return if e.kind() == ErrorKind::NotFound {
                // Concurrent vacuum, ignore it
                Ok(None)
            } else {
                Err(e.into())
            };
        }
    };

    match gc_root {
        Ok(gc_root) => {
            info!("gc_root found: {:?}", gc_root);
            let mut gc_candidates = Vec::with_capacity(snapshots_before_lvt.len());

            for snapshot in snapshots_before_lvt.iter() {
                let path = snapshot.path();
                let last_part = path.rsplit('/').next().unwrap();
                if last_part.starts_with(VACUUM2_OBJECT_KEY_PREFIX) {
                    gc_candidates.push(path.to_owned());
                } else {
                    // This snapshot is created by a node of the previous version which does not
                    // support vacuum2, we rely on the `ASSUMPTION_MAX_TXN_DURATION` to identify if
                    // it is available to be vacuumed.
                    let last_modified = match snapshot.metadata().last_modified() {
                        None => dal.stat(path).await?.last_modified().ok_or_else(|| {
                            ErrorCode::StorageOther(format!(
                                "Failed to get `last_modified` metadata of the snapshot object '{}'",
                                gc_root_path
                            ))
                        })?,
                        Some(v) => v,
                    };
                    if last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts {
                        gc_candidates.push(path.to_owned());
                    }
                }
            }

            let gc_root_idx = gc_candidates.binary_search(&gc_root_path).map_err(|_| {
                ErrorCode::Internal(format!(
                    "gc root path {} should be one of the candidates, candidates: {:?}",
                    gc_root_path, gc_candidates
                ))
            })?;
            let snapshots_to_gc = gc_candidates[..gc_root_idx].to_vec();

            Ok(Some((gc_root, snapshots_to_gc, gc_root_meta_ts)))
        }
        Err(e) => {
            info!("read gc_root {} failed: {:?}", gc_root_path, e);
            Ok(None)
        }
    }
}

fn slice_summary<T: std::fmt::Debug>(s: &[T]) -> String {
    if s.len() > 10 {
        let first_five = &s[..5];
        let last_five = &s[s.len() - 5..];
        format!(
            "First five: {:?}, Last five: {:?},Len: {}",
            first_five,
            last_five,
            s.len()
        )
    } else {
        format!("{:?}", s)
    }
}
